package onion.mqtt.server.processor;


import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.MqttMessageBuilders;
import io.netty.handler.codec.mqtt.MqttMessageIdVariableHeader;
import io.netty.handler.codec.mqtt.MqttUnsubscribeMessage;
import onion.mqtt.server.MqttServerBuilder;
import onion.mqtt.server.event.IMqttServerSubscribeListener;
import onion.mqtt.server.manager.SubscribeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/12
 */
public class UnSubscribeProcessor extends AbstractMqttServerProcessor<MqttUnsubscribeMessage>{
    static final Logger log = LoggerFactory.getLogger(UnSubscribeProcessor.class);
    private final IMqttServerSubscribeListener subscribeEvent;

    public UnSubscribeProcessor(MqttServerBuilder serverBuilder) {
        this.subscribeEvent = serverBuilder.getSubscribeListener();
    }

    @Override
    public void process(Channel channel, MqttUnsubscribeMessage message) {
        log.debug("UnSubscribe clientId: {}", getClientId(channel));

        // 从订阅管理器中移除
        String clientId = getClientId(channel);
        List<String> topics = message.payload().topics();
        topics.forEach(topic -> {
            SubscribeManager.getInstance().removeSubscribeByClient(clientId, topic);

            // 通知订阅事件
            CompletableFuture.runAsync(() -> {
                try {
                    if (subscribeEvent != null) {
                        subscribeEvent.onUnsubscribe(channel, clientId, topic);
                    }
                } catch (Throwable e) {
                    log.error("unSubscribe publishEvent error clientId: {}.", clientId);
                }
            });
        });

        // 返回UnSubAck
        MqttMessageIdVariableHeader variableHeader = message.variableHeader();
        int messageId = variableHeader.messageId();
        writeAndFlush(channel, MqttMessageBuilders.unsubAck()
                .packetId(messageId)
                .build());
    }
}
